package com.huan.flink.source.generator;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * flink 自动生成数据
 *
 * @author huan.fu
 * @date 2023/9/17 - 23:47
 */
public class FlinkGeneratorSourceApplication {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 自动生成数据
        DataGeneratorSource<String> source = new DataGeneratorSource<>(
                // 生成数据
                (GeneratorFunction<Long, String>) value -> LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + " generator value: " + value,
                // 生成多少个数量
                100,
                // 每秒生成多少个数据,是设置所有的subTask每秒产生1个，而不是每个subTask每秒产生1个
                RateLimiterStrategy.perSecond(1),
                // 生成的数据经过 map 之后是什么类型
                Types.STRING
        );
        String sourceName = "datagen";
        environment.fromSource(source, WatermarkStrategy.noWatermarks(), sourceName)
                // 设置并行度为2，上述datagen的count=100,则每个并行度生成 count(100)/parallelism(2)=50个数据，其中一个是[0-49],另一个是[50-99]
                .setParallelism(2)
                .print();

        environment.execute("generator-source");
    }
}
